from collections import OrderedDict
from functools import partial
from typing import Callable, Optional

import torch.nn as nn
import torch
from torch import Tensor
import numpy
import torchaudio
import torch.nn.functional as F


def drop_path(x, drop_prob: float = 0., training: bool = False):
    """
    Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
    "Deep Networks with Stochastic Depth", https://arxiv.org/pdf/1603.09382.pdf

    This function is taken from the rwightman.
    It can be seen here:
    https://github.com/rwightman/pytorch-image-models/blob/master/timm/models/layers/drop.py#L140
    """
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # work with diff dim tensors, not just 2D ConvNets
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output


class DropPath(nn.Module):
    """
    Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks).
    "Deep Networks with Stochastic Depth", https://arxiv.org/pdf/1603.09382.pdf
    """

    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)


class ConvBNAct(nn.Module):
    def __init__(self,
                 in_planes: int,
                 out_planes: int,
                 kernel_size: int = 3,
                 stride: int = 1,
                 groups: int = 1,
                 norm_layer: Optional[Callable[..., nn.Module]] = None,
                 activation_layer: Optional[Callable[..., nn.Module]] = None):
        super(ConvBNAct, self).__init__()

        padding = (kernel_size - 1) // 2
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        if activation_layer is None:
            activation_layer = nn.SiLU  # alias Swish  (torch>=1.7)

        self.conv = nn.Conv2d(in_channels=in_planes,
                              out_channels=out_planes,
                              kernel_size=kernel_size,
                              stride=stride,
                              padding=padding,
                              groups=groups,
                              bias=False)

        self.bn = norm_layer(out_planes)
        self.act = activation_layer()

    def forward(self, x):
        result = self.conv(x)
        result = self.bn(result)
        result = self.act(result)

        return result


class SqueezeExcite(nn.Module):
    def __init__(self,
                 input_c: int,  # block input channel
                 expand_c: int,  # block expand channel
                 se_ratio: float = 0.25):
        super(SqueezeExcite, self).__init__()
        squeeze_c = int(input_c * se_ratio)
        self.conv_reduce = nn.Conv2d(expand_c, squeeze_c, 1)
        self.act1 = nn.SiLU()  # alias Swish
        self.conv_expand = nn.Conv2d(squeeze_c, expand_c, 1)
        self.act2 = nn.Sigmoid()

    def forward(self, x: Tensor) -> Tensor:
        scale = x.mean((2, 3), keepdim=True)
        scale = self.conv_reduce(scale)
        scale = self.act1(scale)
        scale = self.conv_expand(scale)
        scale = self.act2(scale)
        return scale * x


class MBConv(nn.Module):
    def __init__(self,
                 kernel_size: int,
                 input_c: int,
                 out_c: int,
                 expand_ratio: int,
                 stride: int,
                 se_ratio: float,
                 drop_rate: float,
                 norm_layer: Callable[..., nn.Module]):
        super(MBConv, self).__init__()

        if stride not in [1, 2]:
            raise ValueError("illegal stride value.")

        self.has_shortcut = (stride == 1 and input_c == out_c)

        activation_layer = nn.SiLU  # alias Swish
        expanded_c = input_c * expand_ratio

        # 在EfficientNetV2中，MBConv中不存在expansion=1的情况所以conv_pw肯定存在
        assert expand_ratio != 1
        # Point-wise expansion
        self.expand_conv = ConvBNAct(input_c,
                                     expanded_c,
                                     kernel_size=1,
                                     norm_layer=norm_layer,
                                     activation_layer=activation_layer)

        # Depth-wise convolution
        self.dwconv = ConvBNAct(expanded_c,
                                expanded_c,
                                kernel_size=kernel_size,
                                stride=stride,
                                groups=expanded_c,
                                norm_layer=norm_layer,
                                activation_layer=activation_layer)

        self.se = SqueezeExcite(input_c, expanded_c, se_ratio) if se_ratio > 0 else nn.Identity()

        # Point-wise linear projection
        self.project_conv = ConvBNAct(expanded_c,
                                      out_planes=out_c,
                                      kernel_size=1,
                                      norm_layer=norm_layer,
                                      activation_layer=nn.Identity)  # 注意这里没有激活函数，所有传入Identity

        self.out_channels = out_c

        # 只有在使用shortcut连接时才使用dropout层
        self.drop_rate = drop_rate
        if self.has_shortcut and drop_rate > 0:
            self.dropout = DropPath(drop_rate)

    def forward(self, x: Tensor) -> Tensor:
        result = self.expand_conv(x)
        result = self.dwconv(result)
        result = self.se(result)
        result = self.project_conv(result)

        if self.has_shortcut:
            if self.drop_rate > 0:
                result = self.dropout(result)
            result += x

        return result


class FusedMBConv(nn.Module):
    def __init__(self,
                 kernel_size: int,
                 input_c: int,
                 out_c: int,
                 expand_ratio: int,
                 stride: int,
                 se_ratio: float,
                 drop_rate: float,
                 norm_layer: Callable[..., nn.Module]):
        super(FusedMBConv, self).__init__()

        assert stride in [1, 2]
        assert se_ratio == 0

        self.has_shortcut = stride == 1 and input_c == out_c
        self.drop_rate = drop_rate

        self.has_expansion = expand_ratio != 1

        activation_layer = nn.SiLU  # alias Swish
        expanded_c = input_c * expand_ratio

        # 只有当expand ratio不等于1时才有expand conv
        if self.has_expansion:
            # Expansion convolution
            self.expand_conv = ConvBNAct(input_c,
                                         expanded_c,
                                         kernel_size=kernel_size,
                                         stride=stride,
                                         norm_layer=norm_layer,
                                         activation_layer=activation_layer)

            self.project_conv = ConvBNAct(expanded_c,
                                          out_c,
                                          kernel_size=1,
                                          norm_layer=norm_layer,
                                          activation_layer=nn.Identity)  # 注意没有激活函数
        else:
            # 当只有project_conv时的情况
            self.project_conv = ConvBNAct(input_c,
                                          out_c,
                                          kernel_size=kernel_size,
                                          stride=stride,
                                          norm_layer=norm_layer,
                                          activation_layer=activation_layer)  # 注意有激活函数

        self.out_channels = out_c

        # 只有在使用shortcut连接时才使用dropout层
        self.drop_rate = drop_rate
        if self.has_shortcut and drop_rate > 0:
            self.dropout = DropPath(drop_rate)

    def forward(self, x: Tensor) -> Tensor:
        if self.has_expansion:
            result = self.expand_conv(x)
            result = self.project_conv(result)
        else:
            result = self.project_conv(x)

        if self.has_shortcut:
            if self.drop_rate > 0:
                result = self.dropout(result)

            result += x

        return result


# class EfficientNetV2_mel(nn.Module):
#     def __init__(self,
#                  model_cnf: list,
#                  num_classes: int = 1000,
#                  num_features: int = 1280,
#                  dropout_rate: float = 0.2,
#                  drop_connect_rate: float = 0.2,
#                  # num_filters=[16, 32, 64, 128],
#                  nOut=512,
#                  encoder_type='SAP',
#                  n_mels=40,
#                  log_input=True,
#                  **kwargs
#                  ):
#         super(EfficientNetV2_mel, self).__init__()
#
#         self.encoder_type = encoder_type
#         self.n_mels = n_mels
#         self.log_input = log_input
#         self.instancenorm = nn.InstanceNorm1d(n_mels)
#         self.torchfb = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_fft=512, win_length=400,
#                                                             hop_length=160, window_fn=torch.hamming_window,
#                                                             n_mels=n_mels)
#
#         # if self.encoder_type == "SAP":
#         #     self.sap_linear = nn.Linear(num_filters[3] * block.expansion, num_filters[3] * block.expansion)
#         #     self.attention = self.new_parameter(num_filters[3] * block.expansion, 1)
#         #     out_dim = num_filters[3] * block.expansion
#         # elif self.encoder_type == "ASP":
#         #     self.sap_linear = nn.Linear(num_filters[3] * block.expansion, num_filters[3] * block.expansion)
#         #     self.attention = self.new_parameter(num_filters[3] * block.expansion, 1)
#         #     out_dim = num_filters[3] * block.expansion * 2
#         # else:
#         #     raise ValueError('Undefined encoder')
#
#         for cnf in model_cnf:
#             assert len(cnf) == 8
#
#         norm_layer = partial(nn.BatchNorm2d, eps=1e-3, momentum=0.1)
#
#         stem_filter_num = model_cnf[0][4]
#
#         # 更改 3-》1
#         self.stem = ConvBNAct(1,
#                               stem_filter_num,
#                               kernel_size=3,
#                               stride=2,
#                               norm_layer=norm_layer)  # 激活函数默认是SiLU
#
#         total_blocks = sum([i[0] for i in model_cnf])
#         block_id = 0
#         blocks = []
#         for cnf in model_cnf:
#             repeats = cnf[0]
#             op = FusedMBConv if cnf[-2] == 0 else MBConv
#             for i in range(repeats):
#                 blocks.append(op(kernel_size=cnf[1],
#                                  input_c=cnf[4] if i == 0 else cnf[5],
#                                  out_c=cnf[5],
#                                  expand_ratio=cnf[3],
#                                  stride=cnf[2] if i == 0 else 1,
#                                  se_ratio=cnf[-1],
#                                  drop_rate=drop_connect_rate * block_id / total_blocks,
#                                  norm_layer=norm_layer))
#                 block_id += 1
#         self.blocks = nn.Sequential(*blocks)
#
#         head_input_c = model_cnf[-1][-3]
#         head = OrderedDict()
#
#         head.update({"project_conv": ConvBNAct(head_input_c,
#                                                num_features,
#                                                kernel_size=1,
#                                                norm_layer=norm_layer)})  # 激活函数默认是SiLU
#
#         head.update({"avgpool": nn.AdaptiveAvgPool2d(1)})
#         head.update({"flatten": nn.Flatten()})
#
#         if dropout_rate > 0:
#             head.update({"dropout": nn.Dropout(p=dropout_rate, inplace=True)})
#         head.update({"classifier": nn.Linear(num_features, num_classes)})
#
#         self.head = nn.Sequential(head)
#
#         # initial weights
#         for m in self.modules():
#             if isinstance(m, nn.Conv2d):
#                 nn.init.kaiming_normal_(m.weight, mode="fan_out")
#                 if m.bias is not None:
#                     nn.init.zeros_(m.bias)
#             elif isinstance(m, nn.BatchNorm2d):
#                 nn.init.ones_(m.weight)
#                 nn.init.zeros_(m.bias)
#             elif isinstance(m, nn.Linear):
#                 nn.init.normal_(m.weight, 0, 0.01)
#                 nn.init.zeros_(m.bias)
#
#     def forward(self, x: Tensor) -> Tensor:
#
#         # with torch.no_grad():
#         #     with torch.cuda.amp.autocast(enabled=False):
#         #         x = self.torchfb(x) + 1e-6
#         #         if self.log_input: x = x.log()
#         #         x = self.instancenorm(x).unsqueeze(1).detach()
#         # print('x.shape', np.shape(x))
#
#         x = self.stem(x)
#         # print(x.shape)
#         x = self.blocks(x)
#
#         print(x.shape)
#         x = self.head(x)
#         print(x.shape)
#
#         return x

class EfficientNetV2_mel(nn.Module):
    def __init__(self,
                 model_cnf: list,
                 dropout_rate: float = 0.2,
                 drop_connect_rate: float = 0.2,
                 num_features: int = 1280,
                 num_filters=[32, 64, 128, 256],
                 nOut=512,
                 encoder_type='SAP',
                 n_mels=40,
                 log_input=True, **kwargs):
        super(EfficientNetV2_mel, self).__init__()

        print('Embedding size is %d, encoder %s.' % (nOut, encoder_type))

        self.inplanes = num_filters[0]
        self.encoder_type = encoder_type
        self.n_mels = n_mels
        self.log_input = log_input

        self.conv1 = nn.Conv2d(1, num_filters[0], kernel_size=7, stride=(2, 1), padding=3,
                               bias=False)
        self.bn1 = nn.BatchNorm2d(num_filters[0])
        self.relu = nn.ReLU(inplace=True)

        self.instancenorm = nn.InstanceNorm1d(n_mels)
        self.torchfb = torchaudio.transforms.MelSpectrogram(sample_rate=16000, n_fft=512, win_length=400,
                                                            hop_length=160, window_fn=torch.hamming_window,
                                                            n_mels=n_mels)

        if self.encoder_type == "SAP":
            self.sap_linear = nn.Linear(640, 160)
            self.attention = self.new_parameter(160, 1)
            out_dim = 640
        elif self.encoder_type == "ASP":
            self.sap_linear = nn.Linear(num_filters[3] * 4, num_filters[3] * 4)
            self.attention = self.new_parameter(num_filters[3] * 4, 1)
            out_dim = num_filters[3] * 4 * 2
        else:
            raise ValueError('Undefined encoder')

        self.fc = nn.Linear(1280, nOut)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # 以下是effitientNet 内容
        for cnf in model_cnf:
            assert len(cnf) == 8

        norm_layer = partial(nn.BatchNorm2d, eps=1e-3, momentum=0.1)

        stem_filter_num = model_cnf[0][4]

        # 更改 3-》1
        self.stem = ConvBNAct(1,
                              stem_filter_num,
                              kernel_size=3,
                              stride=2,
                              norm_layer=norm_layer)  # 激活函数默认是SiLU

        total_blocks = sum([i[0] for i in model_cnf])
        block_id = 0
        blocks = []
        for cnf in model_cnf:
            repeats = cnf[0]
            op = FusedMBConv if cnf[-2] == 0 else MBConv
            for i in range(repeats):
                blocks.append(op(kernel_size=cnf[1],
                                 input_c=cnf[4] if i == 0 else cnf[5],
                                 out_c=cnf[5],
                                 expand_ratio=cnf[3],
                                 stride=cnf[2] if i == 0 else 1,
                                 se_ratio=cnf[-1],
                                 drop_rate=drop_connect_rate * block_id / total_blocks,
                                 norm_layer=norm_layer))
                block_id += 1
        self.blocks = nn.Sequential(*blocks)

        head_input_c = model_cnf[-1][-3]
        head = OrderedDict()

        head.update({"project_conv": ConvBNAct(head_input_c,
                                               num_features,
                                               kernel_size=1,
                                               norm_layer=norm_layer)})  # 激活函数默认是SiLU

        head.update({"avgpool": nn.AdaptiveAvgPool2d(1)})
        head.update({"flatten": nn.Flatten()})

        if dropout_rate > 0:
            head.update({"dropout": nn.Dropout(p=dropout_rate, inplace=True)})
        # head.update({"classifier": nn.Linear(num_features, num_classes)})

        self.head = nn.Sequential(head)

        # initial weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out")
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

    def new_parameter(self, *size):
        out = nn.Parameter(torch.FloatTensor(*size))
        nn.init.xavier_normal_(out)
        return out

    def forward(self, x):

        with torch.no_grad():
            with torch.cuda.amp.autocast(enabled=False):
                x = self.torchfb(x) + 1e-6
                if self.log_input: x = x.log()
                x = self.instancenorm(x).unsqueeze(1).detach()

        x = self.stem(x)
        x = self.blocks(x)
        # self.blocks(x)(x): torch.Size([200, 640, 2, 7])
        # print('self.blocks(x)(x):', numpy.shape(x))
        x = self.head(x)
        # self.head(x+3x): torch.Size([200, 1280])
        # print('self.head(x):', x.shape)
        # exit()
        # x = torch.mean(x, dim=2, keepdim=True)
        # torch.mean(x, dim=2, keepdim=True): torch.Size([200, 640, 1, 7])
        # print('torch.mean(x, dim=2, keepdim=True):', numpy.shape(x))

        # if self.encoder_type == "SAP":
        #     x = x.permute(0, 3, 1, 2).squeeze(-1)
        #     # x.permute(0, 3, 1, 2).squeeze(-1) torch.Size([200, 7, 640])
        #     # print('x.permute(0, 3, 1, 2).squeeze(-1)', numpy.shape(x))
        #
        #     h = torch.tanh(self.sap_linear(x))
        #     w = torch.matmul(h, self.attention).squeeze(dim=2)
        #     w = F.softmax(w, dim=1).view(x.size(0), x.size(1), 1)
        #     x = torch.sum(x * w, dim=1)
        # elif self.encoder_type == "ASP":
        #     x = x.permute(0, 3, 1, 2).squeeze(-1)
        #     h = torch.tanh(self.sap_linear(x))
        #     w = torch.matmul(h, self.attention).squeeze(dim=2)
        #     w = F.softmax(w, dim=1).view(x.size(0), x.size(1), 1)
        #     mu = torch.sum(x * w, dim=1)
        #     rh = torch.sqrt((torch.sum((x ** 2) * w, dim=1) - mu ** 2).clamp(min=1e-5))
        #     x = torch.cat((mu, rh), 1)
        # print('SAP:', numpy.shape(x))
        # exit()
        x = x.view(x.size()[0], -1)
        # x.view(x.size()[0], -1): torch.Size([200, 1280])
        # print('x.view(x.size()[0], -1):', numpy.shape(x))
        x = self.fc(x)
        # self.fc(x): torch.Size([200, 512])
        # print('self.fc(x):', numpy.shape(x))
        # exit()
        return x


# def MainModel(num_classes: int = 1000):
#     """
#     EfficientNetV2
#     https://arxiv.org/abs/2104.00298
#     """
#     # train_size: 384, eval_size: 480
#
#     # repeat, kernel, stride, expansion, in_c, out_c, operator, se_ratio
#     model_config = [[4, 3, 1, 1, 32, 32, 0, 0],
#                     [7, 3, 2, 4, 32, 64, 0, 0],
#                     [7, 3, 2, 4, 64, 96, 0, 0],
#                     [10, 3, 2, 4, 96, 192, 1, 0.25],
#                     [19, 3, 1, 6, 192, 224, 1, 0.25],
#                     [25, 3, 2, 6, 224, 384, 1, 0.25],
#                     [7, 3, 1, 6, 384, 640, 1, 0.25]]
#
#     model = EfficientNetV2_mel(model_cnf=model_config,
#                                num_classes=num_classes,
#                                dropout_rate=0.4)
#     print('inti OK!!!')
#     exit()
#     return model

def MainModel(nOut=256, **kwargs):
    # Number of filters
    # model_config = [[4, 3, 1, 1, 32, 32, 0, 0],
    #                 [7, 3, 2, 4, 32, 64, 0, 0],
    #                 [7, 3, 2, 4, 64, 96, 0, 0],
    #                 [10, 3, 2, 4, 96, 192, 1, 0.25],
    #                 [19, 3, 1, 6, 192, 224, 1, 0.25],
    #                 [25, 3, 2, 6, 224, 384, 1, 0.25],
    #                 [7, 3, 1, 6, 384, 640, 1, 0.25]]
    model_config = [[2, 3, 1, 1, 24, 24, 0, 0],
                    [4, 3, 2, 4, 24, 48, 0, 0],
                    [4, 3, 2, 4, 48, 64, 0, 0],
                    [6, 3, 2, 4, 64, 128, 1, 0.25],
                    [9, 3, 1, 6, 128, 160, 1, 0.25],
                    [15, 3, 2, 6, 160, 256, 1, 0.25]]
    num_filters = [32, 64, 128, 256]
    model = EfficientNetV2_mel(model_cnf=model_config,
                               nOut=nOut,
                               num_filters=num_filters,
                               dropout_rate=0.4,
                               **kwargs)
    # print('inti OK!!!')
    # exit()
    return model


if __name__ == '__main__':
    model = MainModel().cuda()

    x = torch.rand(200, 1, 20, 202).cuda()
    r = model(x)
